/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.util.bulkdatagenerator;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.GnuParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
import org.apache.hbase.thirdparty.org.apache.commons.cli.Parser;

/**
 * A command line utility to generate pre-splitted HBase Tables with large amount (TBs) of random
 * data, equally distributed among all regions.
 */
public class BulkDataGeneratorTool {

  private static final Logger logger = LoggerFactory.getLogger(BulkDataGeneratorTool.class);

  /**
   * Prefix for the generated HFiles directory
   */
  private static final String OUTPUT_DIRECTORY_PREFIX = "/bulk_data_generator/";

  /**
   * Number of mapper container to be launched for generating of HFiles
   */
  private int mapperCount;

  /**
   * Number of rows to be generated by each mapper
   */
  private long rowsPerMapper;

  /**
   * Table for which random data needs to be generated
   */
  private String table;

  /**
   * Number of splits for the {@link #table}. Number of regions for the table will be
   * ({@link #splitCount} + 1).
   */
  private int splitCount;

  /**
   * Flag to delete the table (before creating) if it already exists
   */
  private boolean deleteTableIfExist;

  /**
   * Additional HBase meta-data options to be set for the table
   */
  private final Map<String, String> tableOptions = new HashMap<>();

  public static void main(String[] args) throws Exception {
    Configuration conf = HBaseConfiguration.create();
    BulkDataGeneratorTool bulkDataGeneratorTool = new BulkDataGeneratorTool();
    bulkDataGeneratorTool.run(conf, args);
  }

  public boolean run(Configuration conf, String[] args) throws IOException {
    // Read CLI arguments
    CommandLine line = null;
    try {
      Parser parser = new GnuParser();
      line = parser.parse(getOptions(), args);
      readCommandLineParameters(conf, line);
    } catch (ParseException | IOException exception) {
      logger.error("Error while parsing CLI arguments.", exception);
      printUsage();
      return false;
    }

    if (line.hasOption("-h")) {
      printUsage();
      return true;
    }

    Preconditions.checkArgument(!StringUtils.isEmpty(table), "Table name must not be empty");
    Preconditions.checkArgument(mapperCount > 0, "Mapper count must be greater than 0");
    Preconditions.checkArgument((splitCount > 0) && (splitCount < Utility.MAX_SPLIT_COUNT),
      "Split count must be greater than 0 and less than " + Utility.MAX_SPLIT_COUNT);
    Preconditions.checkArgument(rowsPerMapper > 0, "Rows per mapper must be greater than 0");

    Path outputDirectory = generateOutputDirectory();
    logger.info("HFiles will be generated at " + outputDirectory.toString());

    try (Connection connection = ConnectionFactory.createConnection(conf)) {
      final Admin admin = connection.getAdmin();
      final TableName tableName = TableName.valueOf(table);
      if (admin.tableExists(tableName)) {
        if (deleteTableIfExist) {
          logger.info(
            "Deleting the table since it already exist and delete-if-exist flag is set to true");
          Utility.deleteTable(admin, table);
        } else {
          logger.info("Table already exists, cannot generate HFiles for existing table.");
          return false;
        }
      }

      // Creating the pre-split table
      Utility.createTable(admin, table, splitCount, tableOptions);
      logger.info(table + " created successfully");

      Job job = createSubmittableJob(conf);

      Table hbaseTable = connection.getTable(tableName);

      // Auto configure partitioner and reducer
      HFileOutputFormat2.configureIncrementalLoad(job, hbaseTable, hbaseTable.getRegionLocator());

      FileOutputFormat.setOutputPath(job, outputDirectory);

      boolean result = job.waitForCompletion(true);

      if (result) {
        logger.info("HFiles generated successfully. Starting bulk load to " + table);
        BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(conf);
        Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> bulkLoadedHFiles =
          bulkLoadHFilesTool.bulkLoad(tableName, outputDirectory);
        boolean status = !bulkLoadedHFiles.isEmpty();
        logger.info("BulkLoadHFiles finished successfully with status " + status);
        return status;
      } else {
        logger.info("Failed to generate HFiles.");
        return false;
      }
    } catch (Exception e) {
      logger.error("Failed to generate data", e);
      return false;
    } finally {
      FileSystem.get(conf).deleteOnExit(outputDirectory);
    }
  }

  protected Job createSubmittableJob(Configuration conf) throws IOException {

    conf.setInt(BulkDataGeneratorMapper.SPLIT_COUNT_KEY, splitCount);
    conf.setInt(BulkDataGeneratorInputFormat.MAPPER_TASK_COUNT_KEY, mapperCount);
    conf.setLong(BulkDataGeneratorRecordReader.RECORDS_PER_MAPPER_TASK_KEY, rowsPerMapper);

    Job job = new Job(conf, BulkDataGeneratorTool.class.getSimpleName() + " - " + table);

    job.setJarByClass(BulkDataGeneratorMapper.class);
    job.setInputFormatClass(BulkDataGeneratorInputFormat.class);

    HBaseConfiguration.addHbaseResources(conf);

    job.setMapperClass(BulkDataGeneratorMapper.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);

    return job;
  }

  /** Returns Random output directory path where HFiles will be generated */
  protected Path generateOutputDirectory() {
    final String outputDirectory =
      OUTPUT_DIRECTORY_PREFIX + "/" + table + "-" + System.currentTimeMillis();
    return new Path(outputDirectory);
  }

  /**
   * This method parses the command line parameters into instance variables
   */
  protected void readCommandLineParameters(Configuration conf, CommandLine line)
    throws ParseException, IOException {
    final List<String> genericParameters = new ArrayList<String>();

    // Parse the generic options
    for (Map.Entry<Object, Object> entry : line.getOptionProperties("D").entrySet()) {
      genericParameters.add("-D");
      genericParameters.add(entry.getKey() + "=" + entry.getValue());
    }

    logger.info(
      "Parsed generic parameters: " + Arrays.toString(genericParameters.toArray(new String[0])));

    new GenericOptionsParser(conf, genericParameters.toArray(new String[0]));

    table = line.getOptionValue("table");

    if (line.hasOption("mapper-count")) {
      mapperCount = Integer.parseInt(line.getOptionValue("mapper-count"));
    }
    if (line.hasOption("split-count")) {
      splitCount = Integer.parseInt(line.getOptionValue("split-count"));
    }
    if (line.hasOption("rows-per-mapper")) {
      rowsPerMapper = Long.parseLong(line.getOptionValue("rows-per-mapper"));
    }

    deleteTableIfExist = line.hasOption("delete-if-exist");

    parseTableOptions(line);
  }

  private void parseTableOptions(final CommandLine line) {
    final String tableOptionsAsString = line.getOptionValue("table-options");
    if (!StringUtils.isEmpty(tableOptionsAsString)) {
      for (String tableOption : tableOptionsAsString.split(",")) {
        final String[] keyValueSplit = tableOption.split("=");
        final String key = keyValueSplit[0];
        final String value = keyValueSplit[1];
        tableOptions.put(key, value);
      }
    }
  }

  /** Returns the command line option for {@link BulkDataGeneratorTool} */
  protected Options getOptions() {
    final Options options = new Options();
    Option option =
      new Option("t", "table", true, "The table name for which data need to be generated.");
    options.addOption(option);

    option = new Option("d", "delete-if-exist", false,
      "If it's set, the table will be deleted if already exist.");
    options.addOption(option);

    option =
      new Option("mc", "mapper-count", true, "The number of mapper containers to be launched.");
    options.addOption(option);

    option = new Option("sc", "split-count", true,
      "The number of regions/pre-splits to be created for the table.");
    options.addOption(option);

    option =
      new Option("r", "rows-per-mapper", true, "The number of rows to be generated PER mapper.");
    options.addOption(option);

    option =
      new Option("o", "table-options", true, "Table options to be set while creating the table.");
    options.addOption(option);

    option = new Option("h", "help", false, "Show help message for the tool");
    options.addOption(option);

    return options;
  }

  protected void printUsage() {
    final HelpFormatter helpFormatter = new HelpFormatter();
    helpFormatter.setWidth(120);
    final String helpMessageCommand = "hbase " + BulkDataGeneratorTool.class.getName();
    final String commandSyntax = helpMessageCommand + " <OPTIONS> [-D<property=value>]*";
    final String helpMessageSuffix = "Examples:\n" + helpMessageCommand
      + " -t TEST_TABLE -mc 10 -r 100 -sc 10\n" + helpMessageCommand
      + " -t TEST_TABLE -mc 10 -r 100 -sc 10 -d -o \"BACKUP=false,NORMALIZATION_ENABLED=false\"\n"
      + helpMessageCommand + " -t TEST_TABLE -mc 10 -r 100 -sc 10 -Dmapreduce.map.memory.mb=8192\n";
    helpFormatter.printHelp(commandSyntax, "", getOptions(), helpMessageSuffix);
  }
}
